typedef struct
{
- GInputStream *result_stream;
+ GBytes *result_buf;
gboolean done;
GError **error;
}
{
FetchUriSyncData *data = user_data;
- (void)_ostree_fetcher_request_finish ((OstreeFetcher*)object,
- result, NULL, &data->result_stream,
- data->error);
+ (void)_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)object,
+ result, &data->result_buf,
+ data->error);
data->done = TRUE;
}
GError **error)
{
gboolean ret = FALSE;
- const guint8 nulchar = 0;
- g_autoptr(GMemoryOutputStream) buf = NULL;
g_autoptr(GMainContext) mainctx = NULL;
FetchUriSyncData data;
g_assert (error != NULL);
- data.result_stream = NULL;
+ memset (&data, 0, sizeof (data));
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
data.done = FALSE;
data.error = error;
- _ostree_fetcher_request_async (fetcher, mirrorlist, filename, 0, max_size,
- OSTREE_FETCHER_DEFAULT_PRIORITY, cancellable,
- fetch_uri_sync_on_complete, &data);
+ _ostree_fetcher_request_to_membuf (fetcher, mirrorlist, filename,
+ add_nul ? OSTREE_FETCHER_REQUEST_NUL_TERMINATION : 0,
+ max_size, OSTREE_FETCHER_DEFAULT_PRIORITY,
+ cancellable, fetch_uri_sync_on_complete, &data);
while (!data.done)
g_main_context_iteration (mainctx, TRUE);
- if (!data.result_stream)
+ if (!data.result_buf)
{
if (allow_noent)
{
goto out;
}
- buf = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
- if (g_output_stream_splice ((GOutputStream*)buf, data.result_stream,
- G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
- cancellable, error) < 0)
- goto out;
-
- if (add_nul)
- {
- if (!g_output_stream_write ((GOutputStream*)buf, &nulchar, 1, cancellable, error))
- goto out;
- }
-
- if (!g_output_stream_close ((GOutputStream*)buf, cancellable, error))
- goto out;
-
ret = TRUE;
- *out_contents = g_memory_output_stream_steal_as_bytes (buf);
+ *out_contents = g_steal_pointer (&data.result_buf);
out:
if (mainctx)
g_main_context_pop_thread_default (mainctx);
- g_clear_object (&(data.result_stream));
+ g_clear_pointer (&data.result_buf, (GDestroyNotify)g_bytes_unref);
return ret;
}
#include "config.h"
+#include <gio/gio.h>
#include <gio/gfiledescriptorbased.h>
#include <gio/gunixoutputstream.h>
#define LIBSOUP_USE_UNSTABLE_REQUEST_API
SoupRequest *request;
- gboolean is_stream;
+ gboolean is_membuf;
+ OstreeFetcherRequestFlags flags;
GInputStream *request_body;
char *out_tmpfile;
GOutputStream *out_stream;
soup_message_headers_append (msg->request_headers, key, value);
}
- if (pending->is_stream)
+ if (pending->is_membuf)
{
soup_request_send_async (pending->request,
cancellable,
*/
if (pending->out_stream)
{
+ if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
+ {
+ const guint8 nulchar = 0;
+ gsize bytes_written;
+
+ if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
+ cancellable, error))
+ goto out;
+ }
+
if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out;
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
- pending->state = OSTREE_FETCHER_STATE_COMPLETE;
- if (fstatat (pending->thread_closure->tmpdir_dfd,
- pending->out_tmpfile,
- &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
+ if (!pending->is_membuf)
{
- glnx_set_error_from_errno (error);
- goto out;
+ if (fstatat (pending->thread_closure->tmpdir_dfd,
+ pending->out_tmpfile,
+ &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
+ {
+ glnx_set_error_from_errno (error);
+ goto out;
+ }
}
+ pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+
/* Now that we've finished downloading, continue with other queued
* requests.
*/
session_thread_process_pending_queue (pending->thread_closure);
- if (stbuf.st_size < pending->content_length)
+ if (!pending->is_membuf)
{
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
- goto out;
- }
- else
- {
- g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
- pending->thread_closure->total_downloaded += stbuf.st_size;
- g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+ if (stbuf.st_size < pending->content_length)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
+ goto out;
+ }
+ else
+ {
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ pending->thread_closure->total_downloaded += stbuf.st_size;
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+ }
}
ret = TRUE;
{
if (!finish_stream (pending, cancellable, &local_error))
goto out;
- g_task_return_pointer (task,
- g_strdup (pending->out_tmpfile),
- (GDestroyNotify) g_free);
+ if (pending->is_membuf)
+ {
+ g_task_return_pointer (task,
+ g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
+ (GDestroyNotify) g_bytes_unref);
+ }
+ else
+ {
+ g_task_return_pointer (task,
+ g_strdup (pending->out_tmpfile),
+ (GDestroyNotify) g_free);
+ }
remove_pending_rerun_queue (pending);
}
else
if (SOUP_IS_REQUEST_HTTP (object))
{
msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
- if (msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
+ if (!pending->is_membuf &&
+ msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
{
// We already have the whole file, so just use it.
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
(void) g_input_stream_close (pending->request_body, NULL, NULL);
- if (pending->is_stream)
- {
- g_task_return_pointer (task,
- g_object_ref (pending->request_body),
- (GDestroyNotify) g_object_unref);
- }
- else
- {
- g_task_return_pointer (task,
- g_strdup (pending->out_tmpfile),
- (GDestroyNotify) g_free);
- }
+ g_task_return_pointer (task,
+ g_strdup (pending->out_tmpfile),
+ (GDestroyNotify) g_free);
remove_pending_rerun_queue (pending);
goto out;
}
pending->content_length = soup_request_get_content_length (pending->request);
- if (!pending->is_stream)
+ if (!pending->is_membuf)
{
int oflags = O_CREAT | O_WRONLY | O_CLOEXEC;
int fd;
goto out;
}
pending->out_stream = g_unix_output_stream_new (fd, TRUE);
-
- g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
- g_hash_table_add (pending->thread_closure->output_stream_set,
- g_object_ref (pending->out_stream));
- g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
-
- g_input_stream_read_bytes_async (pending->request_body,
- 8192, G_PRIORITY_DEFAULT,
- cancellable,
- on_stream_read,
- g_object_ref (task));
}
else
{
- g_task_return_pointer (task,
- g_object_ref (pending->request_body),
- (GDestroyNotify) g_object_unref);
- remove_pending_rerun_queue (pending);
+ pending->out_stream = g_memory_output_stream_new_resizable ();
}
-
+
+ g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
+ g_hash_table_add (pending->thread_closure->output_stream_set,
+ g_object_ref (pending->out_stream));
+ g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
+
+ g_input_stream_read_bytes_async (pending->request_body,
+ 8192, G_PRIORITY_DEFAULT,
+ cancellable,
+ on_stream_read,
+ g_object_ref (task));
+
out:
if (local_error)
{
g_object_unref (task);
}
-void
+static void
_ostree_fetcher_request_async (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
OstreeFetcherRequestFlags flags,
+ gboolean is_membuf,
guint64 max_size,
int priority,
GCancellable *cancellable,
pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->mirrorlist = g_ptr_array_ref (mirrorlist);
pending->filename = g_strdup (filename);
+ pending->flags = flags;
pending->max_size = max_size;
- pending->is_stream = (flags & OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL) == 0;
+ pending->is_membuf = is_membuf;
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_source_tag (task, _ostree_fetcher_request_async);
(GDestroyNotify) g_object_unref);
}
+void
+_ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
+ GPtrArray *mirrorlist,
+ const char *filename,
+ guint64 max_size,
+ int priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ _ostree_fetcher_request_async (self, mirrorlist, filename, 0, FALSE,
+ max_size, priority, cancellable,
+ callback, user_data);
+}
+
gboolean
-_ostree_fetcher_request_finish (OstreeFetcher *self,
- GAsyncResult *result,
- char **out_filename,
- GInputStream **out_stream,
- GError **error)
+_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
+ GAsyncResult *result,
+ char **out_filename,
+ GError **error)
{
GTask *task;
OstreeFetcherPendingURI *pending;
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
- /* Special dance to implement
- enum FetchResult {
- Filename(String path),
- Membuf(uint8[])
- } in Rust terms
- */
task = (GTask*)result;
pending = g_task_get_task_data (task);
if (!ret)
return FALSE;
- if (pending->is_stream)
- {
- g_assert (out_stream);
- *out_stream = ret;
- }
- else
- {
- g_assert (out_filename);
- *out_filename = ret;
- }
+ g_assert (!pending->is_membuf);
+ g_assert (out_filename);
+ *out_filename = ret;
+
+ return TRUE;
+}
+
+void
+_ostree_fetcher_request_to_membuf (OstreeFetcher *self,
+ GPtrArray *mirrorlist,
+ const char *filename,
+ OstreeFetcherRequestFlags flags,
+ guint64 max_size,
+ int priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ _ostree_fetcher_request_async (self, mirrorlist, filename, flags, TRUE,
+ max_size, priority, cancellable,
+ callback, user_data);
+}
+
+gboolean
+_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
+ GAsyncResult *result,
+ GBytes **out_buf,
+ GError **error)
+{
+ GTask *task;
+ OstreeFetcherPendingURI *pending;
+ gpointer ret;
+
+ g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
+ g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
+
+ task = (GTask*)result;
+ pending = g_task_get_task_data (task);
+
+ ret = g_task_propagate_pointer (task, error);
+ if (!ret)
+ return FALSE;
+
+ g_assert (pending->is_membuf);
+ g_assert (out_buf);
+ *out_buf = ret;
return TRUE;
}
+
guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
OstreeObjectType objtype;
gboolean free_fetch_data = TRUE;
- if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error))
+ if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
goto out;
ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
g_debug ("fetch of %s%s complete", checksum_obj,
fetch_data->is_detached_meta ? " (detached)" : "");
- if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error))
+ if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
{
if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
{
g_debug ("fetch static delta part %s complete", fetch_data->expected_checksum);
- if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error))
+ if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
goto out;
fd = openat (_ostree_fetcher_get_dfd (fetcher), temp_path, O_RDONLY | O_CLOEXEC);
else
expected_max_size = 0;
- _ostree_fetcher_request_async (pull_data->fetcher, mirrorlist,
- obj_subpath, OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL,
- expected_max_size,
- is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
- : OSTREE_REPO_PULL_CONTENT_PRIORITY,
- pull_data->cancellable,
- is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
+ _ostree_fetcher_request_to_tmpfile (pull_data->fetcher, mirrorlist,
+ obj_subpath, expected_max_size,
+ is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
+ : OSTREE_REPO_PULL_CONTENT_PRIORITY,
+ pull_data->cancellable,
+ is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
}
static gboolean
}
else
{
- _ostree_fetcher_request_async (pull_data->fetcher,
- pull_data->content_mirrorlist,
- deltapart_path, OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL,
- size,
- OSTREE_FETCHER_DEFAULT_PRIORITY,
- pull_data->cancellable,
- static_deltapart_fetch_on_complete,
- fetch_data);
+ _ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
+ pull_data->content_mirrorlist,
+ deltapart_path, size,
+ OSTREE_FETCHER_DEFAULT_PRIORITY,
+ pull_data->cancellable,
+ static_deltapart_fetch_on_complete,
+ fetch_data);
pull_data->n_outstanding_deltapart_fetches++;
}
}